-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async persistance #488
Async persistance #488
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Reviewed everything up to 9e233c8 in 1 minute and 20 seconds
More details
- Looked at
803
lines of code in5
files - Skipped
0
files when reviewing. - Skipped posting
4
drafted comments based on config settings.
1. burr/core/persistence.py:55
- Draft comment:
Ensure that theis_async
method is overridden in subclasses ofBaseStateLoader
andBaseStateSaver
to accurately reflect their async capabilities. - Reason this comment was not posted:
Comment did not seem useful.
2. burr/core/persistence.py:129
- Draft comment:
Ensure that theis_async
method is overridden in subclasses ofBaseStateLoader
andBaseStateSaver
to accurately reflect their async capabilities. - Reason this comment was not posted:
Marked as duplicate.
3. burr/core/persistence.py:174
- Draft comment:
Ensure that theis_async
method is overridden in subclasses ofBaseStateLoader
andBaseStateSaver
to accurately reflect their async capabilities. - Reason this comment was not posted:
Marked as duplicate.
4. burr/core/persistence.py:86
- Draft comment:
Ensure that theis_async
method is overridden in subclasses ofBaseStateLoader
andBaseStateSaver
to accurately reflect their async capabilities. - Reason this comment was not posted:
Marked as duplicate.
Workflow ID: wflow_QbxcMoFBccy3mX8X
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet
mode, and more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lookking good -- some broad comments. I'm falling on the side of making it a single builder to force sharing of code , IMO it makes it a bit simpler + duplicates less logic. That said, i see both ways of doing it.
About this:
To think about: (1) fire-and-forget or (2) blocking/transactional options --
Async persistors are naturally (1) and sync persistors are (2). If we want to have both options for both cases, we need to:effectively block the async save to turn (1)->(2) and
create an event loop / another tread and make sync save a coroutine executing there to go from (2) -> (1).
My initial idea was to have that option as an attribute of the persistor class and handle it within PersisterHook/PersisterHookAsync, but still checking if there is a more natural place for this.
To clarify -- async persisters are not naturally fire-and-forget -- they'll await completion of the task, which blocks the event loop. asyncio.ensure_future
(I think that's the right one) is a way to make it fire/forget. You're right about the thread -- we'll need to tie into some sort of execution. This is also why we can probably push this up in the stack -- E.G. we don't use the result of these for the next execution, so we can put it in a queue.
We would have to worry about how to ensure order, especially in persisters (there's probably an async queue pattern that works -- should be pretty easy, but definitely more complicated). This also gets to some interesting db/distributed systems problems -- I'm not convinced there aren't other ugly pitfalls here.
So I think we should keep it transactional as it is now...
burr/core/application.py
Outdated
@@ -1174,6 +1184,22 @@ def iterate( | |||
:return: Each iteration returns the result of running `step`. This generator also returns a tuple of | |||
[action, result, current state] | |||
""" | |||
# This is a gentle warning for existing users | |||
if self._adapter_set.async_hooks: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is good to raise -- only in .iterate
though (unless I'm missing something?). Maybe take it and refactor out to a specific checking function, then put in the non-iterate functions as well? stream_result
, step
, run
? There's some nuance about things that get called in both, so worth looking out for that.
burr/core/application.py
Outdated
) | ||
|
||
# Seems fair to raise this if everything is async but the app execution | ||
if self._adapter_set.async_hooks and isinstance(self._builder, AsyncApplicationBuilder): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see -- this makes sense. E.G. why would you ever call ApplicationBuilder
for async stuff.
I'm thinking that we want to centralize validation:
- If any action is non-async -- we should maybe error out
- The other async stuff should call this validation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should error out when the application is built with .abuild()
, has async hooks, and we run it with sync methods, e.g. .run()
since this is very easy to miss that the async hooks just don't get called.
Vice versa, if we have an async application with .arun()
and sync hooks, I wouldn't error it out since we cannot guarantee that we every adapter has async support / user might be ok for some things to be blocking. Maybe a warning here is more appropriate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree -- this is backwards compatible. The second case is odd, actually, as you've pointed out. To make it more concrete, not every synchronous hook is incompatible in an async mode. Take, for example, the hook that prints "doing X step" before every step -- this is just going to be a sync hook, not an async hook, but it should work in an async setting. No need to build an async version of something like that, when it never has an await.
I think we shold maintain the design where .arun()
allows sync + async hooks, .run()
only allows sync hooks (and should break otherwise, althugh that's an edge case...). Also, think this simplifies?
burr/core/application.py
Outdated
self.state_persister = persister # track for later | ||
return self | ||
|
||
async def __with_async_state_persister(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we just want to push this into build
for both sync/async?
burr/core/application.py
Outdated
results=result, final_state=state | ||
out = ( | ||
action, | ||
StreamingResultContainer.pass_through(results=result, final_state=state), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting -- is there a reason it changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, might have been my local settings..
@@ -2249,7 +2279,7 @@ def with_tracker( | |||
|
|||
def initialize_from( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I like with_state_persister
logic moving to build
-- it mirrors this. Might be worth doing for sync side as well. Can always leave as a TODO. But it should simplify logic?
burr/core/application.py
Outdated
pass | ||
|
||
@telemetry.capture_function_usage | ||
async def build(self) -> Application[StateType]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this has duplicated logic from the synchronous one. I wonder if this is a reason to combine.
_with_state_persister
,_load_from_persister
are helper functions for each_build_common()
or something has shared logic. Everything after 2654build()
has synchronous logic -- calls_build_common()
abuild()
has async logic -- also calls_build_common
burr/core/application.py
Outdated
self.state_persister = persister # track for later | ||
return self | ||
|
||
async def __with_async_state_persister(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit -- naming conventions:
- no underscore -- public method
- one underscore -- private method
- two underscores -- name mangling, private for subclasses
- two underscores before/after -- system-level concerns
This should have a single underscore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good -- as noted in a DM I think we have to align on behavior. E.G. when using .build()
versus .abuild()
, sync versus async persister, amethod
versus method
-- which configurations break/log warning/work? Cartesian product/table is a clean way to think of it.
self.state = State() | ||
|
||
if self.state_persister: | ||
await self._set_async_state_persister() # this is used to save the state during application run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking that .abuild()
should allow a sync persister but log a warning. But maybe not? Seems fair to force it here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch that, yes, that case should not work
This is a current working proposition of which cases should allow what. I think we can replace |
OK, yeah, I think this is the right way to do it! Nice work enumerating it, you have me convinced. The only case (and I'm not sure how this works in the one above) is when you have a synchronous hook in an async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! Some small points:
- I think we should document this clearly. Ideally is a
sync
versusasync
page in concepts, but this could just be part of the docstring - A bit of a strange case in parallelism, where we call the new function by default. Might be worth adding in a parameter to bypass validation. Or just breaking -- it's an edge case.
Otherwise, let's ship soon! Nice work!
burr/core/application.py
Outdated
) | ||
|
||
# Seems fair to raise this if everything is async but the app execution is sync | ||
if self._adapter_set.async_hooks and self._builder.is_async: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confused -- this is erroring out if the builder is async and the adapter has async hooks? .is_async
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! So I am calling this in the sync methods and it covers cases 3 and 7 in the above table. Thinking back, maybe it's better to be strict from the get go and only check _builder.is_async
and error out.
Previously, if there were no async hooks present (i.e. no async persisters, etc.) it doesn't really matter how the app was built or is run -- since only sync hooks in the app. But might as well just say if you build it async, use it async.
@@ -2484,3 +2589,51 @@ def build(self) -> Application[StateType]: | |||
state_persister=self.state_persister, | |||
state_initializer=self.state_initializer, | |||
) | |||
|
|||
@telemetry.capture_function_usage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice this function is extremely clean
self.state = State() | ||
|
||
if self.state_persister: | ||
await self._set_async_state_persister() # this is used to save the state during application run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch that, yes, that case should not work
burr/core/application.py
Outdated
async def abuild(self) -> Application[StateType]: | ||
"""Builds the application. | ||
|
||
This function is a bit messy as we iron out the exact logic and rigor we want around things. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer messy, fix this. And make it + build()
clear about the parameters (good place to have this table?).
burr/core/persistence.py
Outdated
status: Literal["completed", "failed"], | ||
**kwargs, | ||
): | ||
# print("I saved something.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
burr/core/parallelism.py
Outdated
else: | ||
builder = builder.with_entrypoint(self.graph.entrypoint).with_state(self.state) | ||
|
||
return await builder.abuild() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, complexity, this has the off-chance of making something backwards incompatible. That said, I think that's OK TBH. In this case it's an odd one:
- Someone is using the async parallelism piece
- Someone is not using the async persister (which they won't be, it isn't there)
- It breaks on reloading
I think very few people are doing this, but it might be worth having a parameter in .abuild()
to bypass validation, we could do that here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add the parameter, but not sure if it is that helpful. If they are using sync persisters they will have to go through the sync .build()
method anyway since .abuild()
expects async persisters.
This is what I capture now parallelism .arun()
: we check if we have sync persisters and then use .build()
to have that backward compatibility, otherwise everything async and ok.
This works as before. You run both sync and async hooks --> sync ones are blocking. Also, if you use sync persister with Will document! |
5008a12
to
b7ba29f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! Really big improvements. Left some small comments but it's there.
burr/core/application.py
Outdated
# this application is meant to be run in async mode. | ||
if self._builder and self._builder.is_async: | ||
raise ValueError( | ||
"The application was build with async hooks " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this say "it was built with abuild
" -- think that's clearer/more actionable
Sync vs Async Applications | ||
=========================== | ||
|
||
TL;DR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link to the arun
, aiterate
, etc... methods. Good to tie it back into the actions.
Adding async support for persistance and refactoring builder: - classes for building async persistance adapters / hooks - builder extended to include async initializer/persister, async build - builder refactored - application added async validation, warning/error when async hooks not invoked - automatic built for app in parallelism made backward compatible
Prototyping and testing async persisters: - Add AsyncDevNull and AsyncInMemory persisters for tests - Added support for async sqlite persister - Test Async persister interface, async builder, async application
Docs explaining allowed cases and deprecation warnings.
b7ba29f
to
ea23c98
Compare
Addressing #484 to have an async persistence interface.
Changes
.abuild()
To think about: (1) fire-and-forget or (2) blocking/transactional options --
Async persistors are naturally (1) and sync persistors are (2). If we want to have both options for both cases, we need to:
My initial idea was to have that option as an attribute of the persistor class and handle it within PersisterHook/PersisterHookAsync, but still checking if there is a more natural place for this.
How I tested this
Notes
This part is very much for discussion if we want to have another
AsyncApplicationBuilder
or put this into the existingApplicationBuilder
. My arguments for having two:with_state_persister
,_load_from_persister
, andbuild
.with_state_persister
is async it gets a bit hairy how to do method chaining -- what I did is to overwrite the original method to just store the state_persister (similar to whatinitialize_from
does) and then pushed all the logic into an async helper function__with_async_state_persister
that gets awaited inbuild
to manually chain coroutines.run
is used instead ofarun
.Having said all of that, I also can push those methods into the existing builder with an
abuild()
to follow the pattern in the Application class where both sync and async are side-by-side.Checklist
Important
Add asynchronous persistence support with
AsyncApplicationBuilder
and related async classes and tests.AsyncApplicationBuilder
to support asynchronous state persistence.create_async_app
inparallelism.py
for async application creation.AsyncDevNullPersister
for testing async persistence.AsyncBaseStateLoader
andAsyncBaseStateSaver
inpersistence.py
for async state operations.PersisterHookAsync
for async lifecycle hooks.test_application.py
, includingtest_async_save_and_load_from_persister_end_to_end
.__init__.py
andapplication.py
to include async classes.This description was created by for 9e233c8. It will automatically update as commits are pushed.